package com.uxsino.reactorq.commons;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;

import reactor.core.publisher.Flux;

/**
 * a {@link Flux} dispatches messsage from JMS Topic. see {@link TopicConnection}
 *
 * @param <T>
 */
public final class TopicFlux<T> extends JMSFlux<T> {
    // private static Logger logger = LoggerFactory.getLogger(TopicFlux.class);

    /**
     * create a TopicFlux
     * @param connection the connection to bind
     * @param topic topic name
     * @param msgClass class of message
     * @param receiverId if the message should be filtered by receiverId, null mean not filter
     * @throws JMSException
     */
    public TopicFlux(TopicConnection connection, String topic, Class<? extends T> msgClass, String receiverId)
        throws JMSException {
        super(connection, topic, msgClass, receiverId);
    }

    /**
     * create a TopicFlux with specific ack mode
     * @param connection the connection to bind
     * @param topic topic name
     * @param msgClass class of message
     * @param receiverId if the message should be filtered by receiverId, null mean not filter
     * @param acknowledge true to use CLIENT_ACKNOWLEDGE mode
     * @throws JMSException
     */
    public TopicFlux(TopicConnection connection, String topic, Class<? extends T> msgClass, String receiverId,
        boolean acknowledge) throws JMSException {
        super(connection, topic, msgClass, receiverId, acknowledge);
    }

    @Override
    protected Session openSession(Connection conn) throws JMSException {
        return ((TopicConnection) connection).createTopicSession(false,
            this.acknowledge ? Session.CLIENT_ACKNOWLEDGE : Session.AUTO_ACKNOWLEDGE);

    }

    @Override
    protected Destination openDestination(Session session) throws JMSException {
        return ((TopicSession) session).createTopic(destinationName);
    }

    @Override
    protected MessageConsumer openMessageConsumer(Session session, String selector) throws JMSException {
        if (selector != null) {
            return ((TopicSession) session).createConsumer((Topic) destination, selector);
        } else {
            return ((TopicSession) session).createConsumer((Topic) destination);
        }

    }

    public void dispose() {
        try {
            if (session != null) {
                session.close();
            }
        } catch (JMSException e) {
        }
        connection = null;
    }
}